/*
 * ------------------------------------------------------------------------
 *
 *  Copyright by KNIME AG, Zurich, Switzerland
 *  Website: http://www.knime.com; Email: contact@knime.com
 *
 *  This program is free software; you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License, Version 3, as
 *  published by the Free Software Foundation.
 *
 *  This program is distributed in the hope that it will be useful, but
 *  WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with this program; if not, see <http://www.gnu.org/licenses>.
 *
 *  Additional permission under GNU GPL version 3 section 7:
 *
 *  KNIME interoperates with ECLIPSE solely via ECLIPSE's plug-in APIs.
 *  Hence, KNIME and ECLIPSE are both independent programs and are not
 *  derived from each other. Should, however, the interpretation of the
 *  GNU GPL Version 3 ("License") under any applicable laws result in
 *  KNIME and ECLIPSE being a combined program, KNIME AG herewith grants
 *  you the additional permission to use and propagate KNIME together with
 *  ECLIPSE with only the license terms in place for ECLIPSE applying to
 *  ECLIPSE and the GNU GPL Version 3 applying for KNIME, provided the
 *  license terms of ECLIPSE themselves allow for the respective use and
 *  propagation of ECLIPSE together with KNIME.
 *
 *  Additional permission relating to nodes for KNIME that extend the Node
 *  Extension (and in particular that are based on subclasses of NodeModel,
 *  NodeDialog, and NodeView) and that only interoperate with KNIME through
 *  standard APIs ("Nodes"):
 *  Nodes are deemed to be separate and independent programs and to not be
 *  covered works.  Notwithstanding anything to the contrary in the
 *  License, the License does not apply to Nodes, you are not required to
 *  license Nodes under the License, and you are granted a license to
 *  prepare and propagate Nodes, in each case even if such Nodes are
 *  propagated with or for interoperation with KNIME.  The owner of a Node
 *  may freely choose the license terms applicable to such Node, including
 *  when such Node is propagated with or for interoperation with KNIME.
 * ---------------------------------------------------------------------
 *
 * History
 *   Jun 21, 2020 (carlwitt): created
 */
package org.knime.core.data.join.implementation;

import java.util.Optional;
import java.util.function.Supplier;

import org.knime.core.data.DataCell;
import org.knime.core.data.DataRow;
import org.knime.core.data.container.CloseableRowIterator;
import org.knime.core.data.join.JoinSpecification;
import org.knime.core.data.join.JoinSpecification.InputTable;
import org.knime.core.data.join.JoinTableSettings;
import org.knime.core.data.join.results.JoinResult;
import org.knime.core.data.join.results.JoinResult.Output;
import org.knime.core.data.join.results.RowHandlerCancelable;
import org.knime.core.node.BufferedDataTable;
import org.knime.core.node.CanceledExecutionException;
import org.knime.core.node.CanceledExecutionException.CancelChecker;
import org.knime.core.node.ExecutionContext;
import org.knime.core.node.InvalidSettingsException;

/**
 * Implements a nested loop join that can have extremely small memory footprint, at the cost of additional iterations
 * over the probe input. Used if the {@link HybridHashJoin} has to flush rows to disk because of scarce heap memory. In
 * this case, input tables are split into {@link DiskBucket}s which are joined using {@link BlockHashJoin}. Ideally, the
 * one of the table partitions is small enough such that {@link BlockHashJoin} can index it in memory and needs only a
 * single pass over the other partition.
 *
 * <h1>Usage</h1>
 *
 * {@link DiskBucket}s store only the columns of the input tables that are necessary for joining (working table format).
 * Thus, joining the disk buckets is a little bit different from joining the original input tables. The new
 * {@link JoinSpecification} is generated by deriving an intermediate {@link JoinSpecification} by reusing the join and
 * include column names and swapping out the original input for the working tables, see
 * {@link JoinSpecification#specWith(JoinTableSettings, JoinTableSettings)}.
 *
 * <h1>Internals</h1>
 *
 * The smaller table is considered the hash input, the other table becomes the probe input. An outer loop performs a
 * single pass over the rows of the hash input. The rows are indexed in a {@link HashIndex} until memory is running low.
 * Then, a complete pass over the probe input is performed and the {@link HashIndex} is discarded. The outer loop then
 * continues to index hash input rows and does more passes over the probe input every time memory is running low. <br/>
 * <br/>
 *
 * Note that when joining two {@link DiskBucket}s P and H originally from the probe table and hash table, P can become
 * the hash input if fewer rows from the original probe input have been mapped to P than rows from the original hash
 * input to H.
 *
 * @author Carl Witt, KNIME AG, Zurich, Switzerland
 */
@SuppressWarnings("javadoc")
class BlockHashJoin extends JoinImplementation {

    /**
     * @param joinSpecification
     * @param exec
     */
    BlockHashJoin(final JoinSpecification joinSpecification, final ExecutionContext exec) {
        super(joinSpecification, exec);
    }

    /**
     * <pre>
     * algorithm overview:
        current hash index = empty index
        1. build phase for the hash index
         1.a keep addding rows from the hash side input table to the current hash index
         1.b if memory runs low
            - signal to the result container that false positive unmatched probe rows may occur
            - execute 2.
            - replace the current hash index with an empty new one to save memory
            - proceed with 1.
        2. probe phase
         - iterate over all rows in the probe side input table, looking up their match partners in the current index
         - collect the unmatched rows in the hash index (because they have seen all probe rows and didn't match)
           - in the match any case, the unmatched hash rows are also collected in deferred manner
             (because results.deferUnmatchedRows was called for both tables)
     * </pre>
     *
     * @param results where to put join results (matches and unmatched rows)
     * @param leftUnmatchedRows unmatched row handler for unmatched rows from the left table
     * @param rightUnmatchedRows unmatched row handler for unmatched rows from the right table
     * @throws CanceledExecutionException
     */
    @Override
    public <T extends Output> JoinResult<T> join(final JoinResult<T> results)
        throws CanceledExecutionException, InvalidSettingsException {

        // matchAny will use multiple calls to this method to perform the disjunctive join
        if (! m_joinSpecification.isConjunctive() && m_joinSpecification.getNumJoinClauses() > 1) {
            return matchAny(BlockHashJoin::new, results);
        }

        // if only one of the input tables is present, add its rows to the unmatched results
        if (incompleteInput(m_joinSpecification, results)) {
            return results;
        }

        final InputTable hashSide = HashIndex.smallerTable(m_joinSpecification);
        final InputTable probeSide = hashSide.other();

        final JoinTableSettings hashSettings = m_joinSpecification.getSettings(hashSide);
        final JoinTableSettings probeSettings = m_joinSpecification.getSettings(probeSide);

        final BufferedDataTable probe = probeSettings.getTable().orElseThrow(IllegalStateException::new);
        final BufferedDataTable hash = hashSettings.getTable().orElseThrow(IllegalStateException::new);

        // after each pass, the rows in the hash index have been compared to all rows in the probe table; thus what's
        // unmatched now is definitely unmatched and can be added to the results
        // if the join problem we're solving here comes from a partition, the row offsets have changed.
        // in case they matter, we can extract them from an auxiliary column added to the rows
        final RowHandlerCancelable unmatchedHashRows = results.unmatched(hashSide);

        // this is an incomplete index, as it represents only the hash rows indexed in one pass over the probe input
        final Supplier<HashIndex> newHashIndex =
            () -> new HashIndex(m_joinSpecification, results, hashSide, m_progress::isCanceled);

        // this may be a partial index (if memory runs low) and thus may be replaced with an index covering the next
        // rows of the hash input
        HashIndex index = newHashIndex.get();

        getProgress().setMessage("Indexing smaller table.");

        // grab and index as many hash input rows as possible (ideally all)
        try (CloseableRowIterator hashRows = hash.iterator()) {

            long rowOffset = 0;
            while (hashRows.hasNext()) {

                DataRow hashRow = hashRows.next();

                DataCell[] joinAttributeValues = hashSettings.get(hashRow);

                index.addHashRow(joinAttributeValues, hashRow, rowOffset);

                // if memory is running low, do a pass over the probe input to be able to clear the hash index
                boolean memoryLow = m_progress.isMemoryLow(100);
                if (memoryLow) {
                    // since we're doing several passes over the probe side, we might get false positive unmatched rows
                    // on the probe side (because we're searching for match partners in an incomplete index)
                    results.deferUnmatchedRows(probeSide);

                    // try to free memory, e.g., by switching from caching unmatched rows to just marking unmatched rows
                    // and collecting them afterwards later
                    results.lowMemory();

                    // process probe input once to be able to clear out the current hash index
                    singlePass(probe, index, unmatchedHashRows);
                    index = newHashIndex.get();
                }

                m_progress.setProgressAndCheckCanceled(1.0 * rowOffset / hash.size());

                rowOffset++;

            } // all hash input rows indexed

        } // close hash input row iterator

        // process pending hash index contents
        singlePass(probe, index, unmatchedHashRows);

        return results;

    }

    private void singlePass(final BufferedDataTable probe, final HashIndex partialIndex, final RowHandlerCancelable unmatchedHashRows)
        throws CanceledExecutionException {

        getProgress().setMessage("Single pass over larger table.");

        CancelChecker checkCanceled = CancelChecker.checkCanceledPeriodicallyWithProgress(m_exec, 100, probe.size());
        JoinResult.enumerateWithResources(probe, partialIndex::joinSingleRow, checkCanceled);

        partialIndex.forUnmatchedHashRows(unmatchedHashRows);
    }

    /**
     * Check that both input tables are present. If only one table is present, output the rows of the other table as
     * unmatched rows.
     *
     * @param joinSpecification contains the two input tables
     * @param container where to put unmatched rows
     * @param m_extractRowOffsets whether to
     * @return true if the input is incomplete
     * @throws CanceledExecutionException
     */
    private <T extends Output> boolean incompleteInput(final JoinSpecification joinSpecification, final JoinResult<T> container)
        throws CanceledExecutionException {

        if (!joinSpecification.getSettings(InputTable.LEFT).hasTable()
            && !joinSpecification.getSettings(InputTable.RIGHT).hasTable()) {
            return true;
        }

        for (InputTable presentSide : InputTable.both()) {
            JoinTableSettings present = joinSpecification.getSettings(presentSide);
            JoinTableSettings absent = joinSpecification.getSettings(presentSide.other());
            Optional<BufferedDataTable> presentTable = present.getTable();
            if (presentTable.isPresent() && !absent.getTable().isPresent()) {
                // collect rows from present table as unmatched
                if (present.isRetainUnmatched()) {
                    JoinResult.enumerateWithResources(presentTable.get(), container.unmatched(presentSide),
                        CancelChecker.checkCanceledPeriodically(m_exec));
                }
                // only one table is present.
                return true;
            }
        }
        return false;
    }

}
